A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.
The trend is to use DataFrames instead of RDDs.
One of the main benefits of the DataFrame approach is that it’s easier to use and more user friendly than the RDD one. Still, the RDD API is still present but put into maintenance mode (it will no longer be extended and will be deprecated when the DataFrame API will reach feature parity with it).
A DataFrame is a Spark Dataset (in short - a distributed, strongly-typed collection of data, the interface was introduced in Spark 1.6) organized into named columns (which represent the variables).
What are the main selling points and benefits of using the DataFrame API over the older RDD one? Here’s a few:
In [ ]:
from pyspark.sql import SparkSession
import pyspark
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
We have multiple possible sources from which we can create a DataFrame. To load a streaming Dataset from an external source we will use the DataStreamReader interface.
The DataStreamReader for the session can be obtained by calling the read method upon the instance.
We can add input options for the underlying data source by calling the option method upon the reader instance. It takes a key and a value as the argument (or a whole Map).
There are two approaches to loading the data:
Here are the most common use cases when it comes to creating a DataFrame and the method used:
In [ ]:
df1 = spark.read.load("../data/users.parquet")
In [ ]:
df1.show()
In [ ]:
df2 = spark.read.csv("../data/people.csv")
In [ ]:
df2.show()
In [ ]:
df3 = spark.read.json("../data/people.json")
In [ ]:
df3.show()
From Hive: Apache Hive is a data warehouse software package. For interfacing DataFrames with Hive we need a SparkSession with enabled Hive support and all the needed dependencies in the classpath for Spark to load them automatically.
We will not cover interfacing with a Hive data storage as this would require an understanding of what Hive is and how it works in more depth. For more information about the topic please consult the official documentation on the subject.
In [ ]:
#rdd = spark.sparkContext.parallelize([{"a":[1,2,3,4],"b":[1,2,3,4]}])
rdd = spark.sparkContext.parallelize([pyspark.sql.Row(a=1,b=1),
pyspark.sql.Row(a=2,b=2),
pyspark.sql.Row(a=3,b=3),
pyspark.sql.Row(a=4,b=4)])
df4 = rdd.toDF()
df4.show()
DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, Python and R.
As mentioned above, in Spark 2.0, DataFrames are just Dataset of Rows in Scala and Java API. These operations are also referred as “untyped transformations” in contrast to “typed transformations” come with strongly typed Scala/Java Datasets.
Here we include some basic examples of structured data processing using Datasets:
In [ ]:
df = spark.read.json("../data/people.json")
# Print the schema in a tree format
df.printSchema()
In [ ]:
# Select only the "name" column
df.select("name").show()
In [ ]:
#Select only the "name" column
df.select("name").show()
In [ ]:
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
In [ ]:
# Select people older than 21
df.filter(df['age'] > 21).show()
In [ ]:
# Count people by age
df.groupBy("age").count().show()
In [ ]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
Spark SQL supports two different methods for converting existing RDDs into Datasets. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.
The second method for creating Datasets is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct Datasets when the columns and their types are not known until runtime.
Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes.
Rows are constructed by passing a list of key/value pairs as kwargs to the Row class.
The keys of this list define the column names of the table, and the types are inferred by sampling the whole dataset, similar to the inference that is performed on JSON files.
In [ ]:
from pyspark.sql import Row
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("../data/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
print(name)
# Name: Justin
When a dictionary of kwargs cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.
For example:
In [ ]:
# Import data types
from pyspark.sql.types import *
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("../data/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")
results.show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+